60b59ad7a8a0ab9e84d2974241e8ff9f8748d391,tests/test_apps/adhoc-rejoin-consistency/src/AsyncBenchmark.java,AsyncBenchmark,runBenchmark,#,428
Before Change
benchmarkThread = Thread.currentThread();
System.out.print(HORIZONTAL_RULE);
System.out.println(" Setup & Initialization");
System.out.println(HORIZONTAL_RULE);
// connect to one or more servers, loop until success
// first server in the list is a blessed node, we only connect to it
// second server in the list is the rejoinable node, we never connect
// to it in this part of the test
connect(config.servers.split(",")[0]);
// get the partition count
ClientResponse resp = client.callProcedure("@Statistics", "PARTITIONCOUNT", 0);
VoltTable[] tpc = resp.getResults();
nPartitions=0;
while (tpc[0].advanceRow()) {
nPartitions = (int) tpc[0].getLong(0);
}
System.out.printf("partition count: %d\n", nPartitions);
if (nPartitions < 2) {
System.err.printf("Less than 2 partitions\n", nPartitions);
System.exit(1);
}
client.callProcedure("Initialize", nPartitions);
System.out.print(HORIZONTAL_RULE);
System.out.println("Starting Benchmark");
System.out.println(HORIZONTAL_RULE);
// print periodic statistics to the console
benchmarkStartTS = System.currentTimeMillis();
schedulePeriodicStats();
// Run the benchmark loop for the requested duration
// The throughput may be throttled depending on client configuration
System.out.println("\nRunning benchmark...");
final long benchmarkEndTime = System.currentTimeMillis() + (1000l * config.duration);
//String qOps[] = {"*","/"};
String qOps[] = {"+","+"};
int lastCatalog = 0;
while (runBenchmark && (benchmarkEndTime > System.currentTimeMillis())) {
int r = rand.nextInt(2); // 50/50 multiply or divide operation
int p = rand.nextInt(nPartitions); //choose a counter
int c = rand.nextInt(1)+2; // values 2-3
Tests tc;
if (testCase == null) {
tc = Tests.values()[rand.nextInt(Tests.values().length)];
//System.err.printf("selected test: %s\n", tc);
}
else
tc = testCase;
totalAsync.getAndIncrement();
try {
switch (tc) {
case ADHOCSINGLEPARTPTN:
// single part adhoc query ENG-3886 also see ENG-4076
//System.err.printf("adhoc singlepart...\n");
client.callProcedure(new SequenceCallback(),
"@AdHoc", "UPDATE COUNTERS_PTN set COUNTER=COUNTER"+ qOps[r] + Integer.toString(c) + " WHERE id=" +
Integer.toString(p) + ";" );
totalAdHoc.getAndIncrement();
break;
case ADHOCMULTIPARTPTN:
// multipart adhoc query ENG-3887
//System.err.printf("adhoc multipart...\n");
client.callProcedure(new SequenceCallback(),
"@AdHoc", "UPDATE COUNTERS_PTN set COUNTER=COUNTER"+ qOps[r] + Integer.toString(c) +";");
totalAdHoc.getAndIncrement();
break;
case ADHOCSINGLEPARTREP:
// multipart adhoc query ENG-3887
//System.err.printf("adhoc multipart...\n");
client.callProcedure(new SequenceCallback(),
"@AdHoc", "UPDATE COUNTERS_REP set COUNTER=COUNTER"+ qOps[r] + Integer.toString(c) + " WHERE id=" +
Integer.toString(p) + ";" );
totalAdHoc.getAndIncrement();
break;
case ADHOCMULTIPARTREP:
// multipart adhoc query ENG-3887
//System.err.printf("adhoc multipart...\n");
client.callProcedure(new SequenceCallback(),
"@AdHoc", "UPDATE COUNTERS_REP set COUNTER=COUNTER"+ qOps[r] + Integer.toString(c) +";");
totalAdHoc.getAndIncrement();
break;
case UPDATEAPPLICATIONCATALOG:
// UpdateApplicationCatalog
// we want the update application catalog command to be issued during the rejoin
// but the client is async relative to killing and rejoining.
// also, the rejoin time will vary a lot depending on the nodes and sitesperhost.
// so long run times will be required to possibly hit the right timing.
// bottom line-this is not going to be a meaningful test when run for short durations.
ClientResponse response = null;
// Find out which catalog we are on
try {
response = client.callProcedure("@AdHoc", "Select count(*) from replicated;");
if (response.getStatus() == ClientResponse.SUCCESS) {
lastCatalog = 1;
} else {
lastCatalog = 0;
}
}
catch (ProcCallException e) {
// expect a planner exception on catalog 0
//e.printStackTrace();
lastCatalog = 0;
}
catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
}
// running ALL, we don't wait, otherwise go slow.
if (testCase != null) {
Thread.sleep(rand.nextInt(20000)+1); // really slow
}
// now, flip to the other catalog
// this runs as a synchronous tx (for now)
System.err.printf("updateapplicationcatalog %d...\n", lastCatalog);
// create catalog
String catPath = "/home/prosegay/branches/ENG-3884/voltdb/tests/test_apps/adhoc-rejoin-consistency";
File catalog_files[] = { new File(catPath + "/AdHocRejoinConsistency.jar"),
new File(catPath + "/AdHocRejoinConsistency2.jar") };
File file2 = new File(catPath + "/deployment.xml");
// Flip the catalog
lastCatalog = (lastCatalog+1) % 2;
response = client.updateApplicationCatalog(catalog_files[ lastCatalog ], file2);
if (response.getStatus() != ClientResponse.SUCCESS) {
System.err.printf("UAC operation failed with %s\n", response.getStatusString());
throw new RuntimeException();
} else {
successfulAsync.getAndIncrement();
// check if we're on the right catalog
try {
response = client.callProcedure("@AdHoc", "Select count(*) from replicated;");
switch (lastCatalog) {
case 0:
if (response.getStatus() == ClientResponse.SUCCESS) {
System.err.printf("unexpected result for catalog 0\n");
throw new RuntimeException();
}
break;
case 1:
if (response.getStatus() != ClientResponse.SUCCESS) {
System.err.printf("unexpected result for catalog 1\n");
throw new RuntimeException();
}
break;
default:
throw new RuntimeException("Invalid catalog switch value");
}
}
catch (ProcCallException e) {
if (lastCatalog != 0) {
e.printStackTrace();
System.err.printf("unexpected result for catalog 1 in proccallexception %d\n%s\n", lastCatalog,
e.getMessage());
throw new RuntimeException();
}
}
}
break;
case WRSINGLEPARTSTOREDPROCPTN:
// single-part stored procedure
client.callProcedure(new SequenceCallback(), "getNextFromPtn", p, nPartitions);
break;
case WRMULTIPARTSTOREDPROCPTN:
// multi-part stored procedure
// Updates a partitioned table
client.callProcedure(new SequenceCallback(), "MPUpdatePtn");
case WRMULTIPARTSTOREDPROCREP:
// multi-part stored procedure
// Updates a replicated table
client.callProcedure(new SequenceCallback(), "MPUpdateRep");
break;
case LOADSINGLEPARTITIONTABLEPTN: // this case is failing
break;
case LOADMULTIPARTITIONTABLEREP:
// LoadSinglePartitionTable LoadMultiPartitionTable ENG-3885 part 1 of 2
// voltLoadTable is not client exposed
// voltLoadTable is used for the initial load on DR
// Get all the rows from the counter table and insert them into the
// like_counter table, then compare both copies of the target table after rejoin
response = null;
try {
response = client.callProcedure("getRowFromPtn", p);
if (response.getStatus() != ClientResponse.SUCCESS) {
System.err.printf("FATAL Unexpectd result getting source row %s\n",
response.getStatusString());
throw new RuntimeException();
}
}
catch (ProcCallException e) {
//e.printStackTrace();
System.err.printf("unexpected exception getting source row\n %s\n", e.getMessage());
}
VoltTable vt[] = response.getResults();
if ( vt.length == 0 ) {
System.err.printf("FATAL VoltTable[] object has no elememts\n");
throw new RuntimeException();
}
if ( vt[0].getRowCount() != 1 ) {
System.err.printf("FATAL VoltTable object has wrong number of rows %d\n", vt[0].getRowCount());
throw new RuntimeException();
}
VoltTable vt0 = vt[0];
// insert row into target table
try {
switch (tc) {
case LOADSINGLEPARTITIONTABLEPTN:
client.callProcedure(new SequenceCallback(),
"@LoadSinglepartitionTable", "LIKE_COUNTERS_PTN", vt0);
break;
case LOADMULTIPARTITIONTABLEREP:
client.callProcedure(new SequenceCallback(),
"@LoadMultipartitionTable", "LIKE_COUNTERS_REP", vt0);
break;
}
}
catch (VoltAbortException e) {
System.err.printf("FATAL Load single/multi table failed with an exception\n%s\n", e.getMessage());
throw new RuntimeException();
}
break;
default:
throw new RuntimeException("Invalid query selector switch value: '" + tc + "'");
}
}
catch (NoConnectionsException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
catch (InterruptedException e) {
e.printStackTrace();
System.err.printf("Caught InterrruptedException: %s\ntoString: %s\n", e.getMessage(), e.toString());
//throw new RuntimeException(e);
}
catch (IOException e) {
e.printStackTrace();
System.err.printf("Caught IOException: %s\ntoString: %s\n", e.getMessage(), e.toString());
//throw new RuntimeException(e);
}
catch (Exception e) {
e.printStackTrace();
System.err.printf("Caught Exception: %s\ntoString: %s\n", e.getMessage(), e.toString());
throw new RuntimeException(e);
}
Thread.yield();
} // while
// cancel periodic stats printing
timer.cancel();
try {
// block until all outstanding txns return
System.err.println("draining connection...");
client.drain();
}
After Change
}
catch (IOException e) {
logStackTrace(e);
log.error(_F("Caught IOException: %s\ntoString: %s\n", e.getMessage(), e.toString()));
//throw new RuntimeException(e);
}
catch (Exception e) {